1   package org.apache.lucene.index;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements.  See the NOTICE file distributed with
6    * this work for additional information regarding copyright ownership.
7    * The ASF licenses this file to You under the Apache License, Version 2.0
8    * (the "License"); you may not use this file except in compliance with
9    * the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  import java.io.Closeable;
21  import java.io.IOException;
22  import java.nio.file.DirectoryStream;
23  import java.nio.file.Files;
24  import java.nio.file.Path;
25  import java.util.ArrayList;
26  import java.util.Collections;
27  import java.util.HashSet;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.atomic.AtomicLong;
33  import java.util.regex.Pattern;
34  
35  import org.apache.lucene.document.Document;
36  import org.apache.lucene.document.Field;
37  import org.apache.lucene.document.LongField;
38  import org.apache.lucene.document.NumericDocValuesField;
39  import org.apache.lucene.search.IndexSearcher;
40  import org.apache.lucene.search.MatchAllDocsQuery;
41  import org.apache.lucene.search.NumericRangeQuery;
42  import org.apache.lucene.search.ScoreDoc;
43  import org.apache.lucene.search.Sort;
44  import org.apache.lucene.search.SortField;
45  import org.apache.lucene.search.TopDocs;
46  import org.apache.lucene.store.Directory;
47  import org.apache.lucene.store.FSDirectory;
48  import org.apache.lucene.store.IOContext;
49  import org.apache.lucene.store.MockDirectoryWrapper.Throttling;
50  import org.apache.lucene.store.MockDirectoryWrapper;
51  import org.apache.lucene.util.Bits;
52  import org.apache.lucene.util.IOUtils;
53  import org.apache.lucene.util.LuceneTestCase;
54  import org.apache.lucene.util.StringHelper;
55  import org.apache.lucene.util.TestUtil;
56  
57  // TODO:
58  //   - old parallel indices are only pruned on commit/close; can we do it on refresh?
59  
60  /** Simple example showing how to use ParallelLeafReader to index new
61   *  stuff (postings, DVs, etc.) from previously stored fields, on the
62   *  fly (during NRT reader reopen), after the  initial indexing.  The
63   *  test indexes just a single stored field with text "content X" (X is
64   *  a number embedded in the text).
65   *
66   *  Then, on reopen, for any newly created segments (flush or merge), it
67   *  builds a new parallel segment by loading all stored docs, parsing
68   *  out that X, and adding it as DV and numeric indexed (trie) field.
69   *
70   *  Finally, for searching, it builds a top-level MultiReader, with
71   *  ParallelLeafReader for each segment, and then tests that random
72   *  numeric range queries, and sorting by the new DV field, work
73   *  correctly.
74   *
75   *  Each per-segment index lives in a private directory next to the main
76   *  index, and they are deleted once their segments are removed from the
77   *  index.  They are "volatile", meaning if e.g. the index is replicated to
78   *  another machine, it's OK to not copy parallel segments indices,
79   *  since they will just be regnerated (at a cost though). */
80  
81  // @SuppressSysoutChecks(bugUrl="we print stuff")
82  
83  public class TestDemoParallelLeafReader extends LuceneTestCase {
84  
85    static final boolean DEBUG = false;
86  
87    static abstract class ReindexingReader implements Closeable {
88  
89      /** Key used to store the current schema gen in the SegmentInfo diagnostics */
90      public final static String SCHEMA_GEN_KEY = "schema_gen";
91  
92      public final IndexWriter w;
93      public final ReaderManager mgr;
94  
95      private final Directory indexDir;
96      private final Path root;
97      private final Path segsPath;
98  
99      /** Which segments have been closed, but their parallel index is not yet not removed. */
100     private final Set<SegmentIDAndGen> closedSegments = Collections.newSetFromMap(new ConcurrentHashMap<SegmentIDAndGen,Boolean>());
101 
102     /** Holds currently open parallel readers for each segment. */
103     private final Map<SegmentIDAndGen,LeafReader> parallelReaders = new ConcurrentHashMap<>();
104 
105     void printRefCounts() {
106       System.out.println("All refCounts:");
107       for(Map.Entry<SegmentIDAndGen,LeafReader> ent : parallelReaders.entrySet()) {
108         System.out.println("  " + ent.getKey() + " " + ent.getValue() + " refCount=" + ent.getValue().getRefCount());
109       }
110     }
111 
112     public ReindexingReader(Path root) throws IOException {
113       this.root = root;
114 
115       // Normal index is stored under "index":
116       indexDir = openDirectory(root.resolve("index"));
117 
118       // Per-segment parallel indices are stored under subdirs "segs":
119       segsPath = root.resolve("segs");
120       Files.createDirectories(segsPath);
121 
122       IndexWriterConfig iwc = getIndexWriterConfig();
123       iwc.setMergePolicy(new ReindexingMergePolicy(iwc.getMergePolicy()));
124       if (DEBUG) {
125         System.out.println("TEST: use IWC:\n" + iwc);
126       }
127       w = new IndexWriter(indexDir, iwc);
128 
129       w.getConfig().setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
130           @Override
131           public void warm(LeafReader reader) throws IOException {
132             // This will build the parallel index for the merged segment before the merge becomes visible, so reopen delay is only due to
133             // newly flushed segments:
134             if (DEBUG) System.out.println(Thread.currentThread().getName() +": TEST: now warm " + reader);
135             // TODO: it's not great that we pass false here; it means we close the reader & reopen again for NRT reader; still we did "warm" by
136             // building the parallel index, if necessary
137             getParallelLeafReader(reader, false, getCurrentSchemaGen());
138           }
139         });
140 
141       // start with empty commit:
142       w.commit();
143       mgr = new ReaderManager(new ParallelLeafDirectoryReader(DirectoryReader.open(w, true)));
144     }
145 
146     protected abstract IndexWriterConfig getIndexWriterConfig() throws IOException;
147 
148     /** Optional method to validate that the provided parallell reader in fact reflects the changes in schemaGen. */
149     protected void checkParallelReader(LeafReader reader, LeafReader parallelReader, long schemaGen) throws IOException {
150     }
151 
152     /** Override to customize Directory impl. */
153     protected Directory openDirectory(Path path) throws IOException {
154       return FSDirectory.open(path);
155     }
156 
157     public void commit() throws IOException {
158       w.commit();
159     }
160     
161     LeafReader getCurrentReader(LeafReader reader, long schemaGen) throws IOException {
162       LeafReader parallelReader = getParallelLeafReader(reader, true, schemaGen);
163       if (parallelReader != null) {
164 
165         // We should not be embedding one ParallelLeafReader inside another:
166         assertFalse(parallelReader instanceof ParallelLeafReader);
167         assertFalse(reader instanceof ParallelLeafReader);
168 
169         // NOTE: important that parallelReader is first, so if there are field name overlaps, because changes to the schema
170         // overwrote existing field names, it wins:
171         LeafReader newReader = new ParallelLeafReader(false, parallelReader, reader) {
172           @Override
173           public Bits getLiveDocs() {
174             return getParallelReaders()[1].getLiveDocs();
175           }
176           @Override
177           public int numDocs() {
178             return getParallelReaders()[1].numDocs();
179           }
180         };
181 
182         // Because ParallelLeafReader does its own (extra) incRef:
183         parallelReader.decRef();
184 
185         return newReader;
186 
187       } else {
188         // This segment was already current as of currentSchemaGen:
189         return reader;
190       }
191     }
192 
193     private class ParallelLeafDirectoryReader extends FilterDirectoryReader {
194       public ParallelLeafDirectoryReader(DirectoryReader in) throws IOException {
195         super(in, new FilterDirectoryReader.SubReaderWrapper() {
196             final long currentSchemaGen = getCurrentSchemaGen();
197             @Override
198             public LeafReader wrap(LeafReader reader) {
199               try {
200                 return getCurrentReader(reader, currentSchemaGen);
201               } catch (IOException ioe) {
202                 // TODO: must close on exc here:
203                 throw new RuntimeException(ioe);
204               }
205             }
206           });
207       }
208 
209       @Override
210       protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
211         return new ParallelLeafDirectoryReader(in);
212       }
213 
214       @Override
215       protected void doClose() throws IOException {
216         Throwable firstExc = null;
217         for (final LeafReader r : getSequentialSubReaders()) {
218           if (r instanceof ParallelLeafReader) {
219             // try to close each reader, even if an exception is thrown
220             try {
221               r.decRef();
222             } catch (Throwable t) {
223               if (firstExc == null) {
224                 firstExc = t;
225               }
226             }
227           }
228         }
229         // Also close in, so it decRef's the SegmentInfos
230         try {
231           in.doClose();
232         } catch (Throwable t) {
233           if (firstExc == null) {
234             firstExc = t;
235           }
236         }
237         // throw the first exception
238         IOUtils.reThrow(firstExc);
239       }
240     }
241 
242     @Override
243     public void close() throws IOException {
244       w.close();
245       if (DEBUG) System.out.println("TEST: after close writer index=" + SegmentInfos.readLatestCommit(indexDir));
246 
247       /*
248       DirectoryReader r = mgr.acquire();
249       try {
250         TestUtil.checkReader(r);
251       } finally {
252         mgr.release(r);
253       }
254       */
255       mgr.close();
256       pruneOldSegments(true);
257       assertNoExtraSegments();
258       indexDir.close();
259     }
260 
261     // Make sure we deleted all parallel indices for segments that are no longer in the main index: 
262     private void assertNoExtraSegments() throws IOException {
263       Set<String> liveIDs = new HashSet<String>();
264       for(SegmentCommitInfo info : SegmentInfos.readLatestCommit(indexDir)) {
265         String idString = StringHelper.idToString(info.info.getId());
266         liveIDs.add(idString);
267       }
268 
269       // At this point (closing) the only segments in closedSegments should be the still-live ones:
270       for(SegmentIDAndGen segIDGen : closedSegments) {
271         assertTrue(liveIDs.contains(segIDGen.segID));
272       }
273 
274       boolean fail = false;
275       for(Path path : segSubDirs(segsPath)) {
276         SegmentIDAndGen segIDGen = new SegmentIDAndGen(path.getFileName().toString());
277         if (liveIDs.contains(segIDGen.segID) == false) {
278           if (DEBUG) System.out.println("TEST: fail seg=" + path.getFileName() + " is not live but still has a parallel index");
279           fail = true;
280         }
281       }
282       assertFalse(fail);
283     }
284 
285     private static class SegmentIDAndGen {
286       public final String segID;
287       public final long schemaGen;
288 
289       public SegmentIDAndGen(String segID, long schemaGen) {
290         this.segID = segID;
291         this.schemaGen = schemaGen;
292       }
293 
294       public SegmentIDAndGen(String s) {
295         String[] parts = s.split("_");
296         if (parts.length != 2) {
297           throw new IllegalArgumentException("invalid SegmentIDAndGen \"" + s + "\"");
298         }
299         // TODO: better checking of segID?
300         segID = parts[0];
301         schemaGen = Long.parseLong(parts[1]);
302       }
303 
304       @Override
305       public int hashCode() {
306         return (int) (segID.hashCode() * schemaGen);
307       }
308 
309       @Override
310       public boolean equals(Object _other) {
311         if (_other instanceof SegmentIDAndGen) {
312           SegmentIDAndGen other = (SegmentIDAndGen) _other;
313           return segID.equals(other.segID) && schemaGen == other.schemaGen;
314         } else {
315           return false;
316         }
317       }
318 
319       @Override
320       public String toString() {
321         return segID + "_" + schemaGen;
322       }
323     }
324 
325     private class ParallelReaderClosed implements LeafReader.ReaderClosedListener {
326       private final SegmentIDAndGen segIDGen;
327       private final Directory dir;
328 
329       public ParallelReaderClosed(SegmentIDAndGen segIDGen, Directory dir) {
330         this.segIDGen = segIDGen;
331         this.dir = dir;
332       }
333 
334       @Override
335       public void onClose(IndexReader ignored) {
336         try {
337           // TODO: make this sync finer, i.e. just the segment + schemaGen
338           synchronized(ReindexingReader.this) {
339             if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now close parallel parLeafReader dir=" + dir + " segIDGen=" + segIDGen);
340             parallelReaders.remove(segIDGen);
341             dir.close();
342             closedSegments.add(segIDGen);
343           }
344         } catch (IOException ioe) {
345           System.out.println("TEST: hit IOExc closing dir=" + dir);
346           ioe.printStackTrace(System.out);
347           throw new RuntimeException(ioe);
348         }
349       }
350     }
351 
352     // Returns a ref
353     LeafReader getParallelLeafReader(final LeafReader leaf, boolean doCache, long schemaGen) throws IOException {
354       assert leaf instanceof SegmentReader;
355       SegmentInfo info = ((SegmentReader) leaf).getSegmentInfo().info;
356 
357       long infoSchemaGen = getSchemaGen(info);
358 
359       if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: getParallelLeafReader: " + leaf + " infoSchemaGen=" + infoSchemaGen + " vs schemaGen=" + schemaGen + " doCache=" + doCache);
360 
361       if (infoSchemaGen == schemaGen) {
362         if (DEBUG) System.out.println(Thread.currentThread().getName()+ ": TEST: segment is already current schemaGen=" + schemaGen + "; skipping");
363         return null;
364       }
365 
366       if (infoSchemaGen > schemaGen) {
367         throw new IllegalStateException("segment infoSchemaGen (" + infoSchemaGen + ") cannot be greater than requested schemaGen (" + schemaGen + ")");
368       }
369 
370       final SegmentIDAndGen segIDGen = new SegmentIDAndGen(StringHelper.idToString(info.getId()), schemaGen);
371 
372       // While loop because the parallel reader may be closed out from under us, so we must retry:
373       while (true) {
374 
375         // TODO: make this sync finer, i.e. just the segment + schemaGen
376         synchronized (this) {
377           LeafReader parReader = parallelReaders.get(segIDGen);
378       
379           assert doCache || parReader == null;
380 
381           if (parReader == null) {
382 
383             Path leafIndex = segsPath.resolve(segIDGen.toString());
384 
385             final Directory dir = openDirectory(leafIndex);
386 
387             if (Files.exists(leafIndex.resolve("done")) == false) {
388               if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: build segment index for " + leaf + " " + segIDGen + " (source: " + info.getDiagnostics().get("source") + ") dir=" + leafIndex);
389 
390               if (dir.listAll().length != 0) {
391                 // It crashed before finishing last time:
392                 if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: remove old incomplete index files: " + leafIndex);
393                 IOUtils.rm(leafIndex);
394               }
395 
396               reindex(infoSchemaGen, schemaGen, leaf, dir);
397 
398               // Marker file, telling us this index is in fact done.  This way if we crash while doing the reindexing for a given segment, we will
399               // later try again:
400               dir.createOutput("done", IOContext.DEFAULT).close();
401             } else {
402               if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: segment index already exists for " + leaf + " " + segIDGen + " (source: " + info.getDiagnostics().get("source") + ") dir=" + leafIndex);
403             }
404 
405             if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now check index " + dir);
406             //TestUtil.checkIndex(dir);
407 
408             SegmentInfos infos = SegmentInfos.readLatestCommit(dir);
409             final LeafReader parLeafReader;
410             if (infos.size() == 1) {
411               parLeafReader = new SegmentReader(infos.info(0), IOContext.DEFAULT);
412             } else {
413               // This just means we didn't forceMerge above:
414               parLeafReader = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(dir));
415             }
416 
417             //checkParallelReader(leaf, parLeafReader, schemaGen);
418 
419             if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: opened parallel reader: " + parLeafReader);
420             if (doCache) {
421               parallelReaders.put(segIDGen, parLeafReader);
422 
423               // Our id+gen could have been previously closed, e.g. if it was a merged segment that was warmed, so we must clear this else
424               // the pruning may remove our directory:
425               closedSegments.remove(segIDGen);
426 
427               parLeafReader.addReaderClosedListener(new ParallelReaderClosed(segIDGen, dir));
428 
429             } else {
430               // Used only for merged segment warming:
431               // Messy: we close this reader now, instead of leaving open for reuse:
432               if (DEBUG) System.out.println("TEST: now decRef non cached refCount=" + parLeafReader.getRefCount());
433               parLeafReader.decRef();
434               dir.close();
435 
436               // Must do this after dir is closed, else another thread could "rm -rf" while we are closing (which makes MDW.close's
437               // checkIndex angry):
438               closedSegments.add(segIDGen);
439               parReader = null;
440             }
441             parReader = parLeafReader;
442 
443           } else {
444             if (parReader.tryIncRef() == false) {
445               // We failed: this reader just got closed by another thread, e.g. refresh thread opening a new reader, so this reader is now
446               // closed and we must try again.
447               if (DEBUG) System.out.println(Thread.currentThread().getName()+ ": TEST: tryIncRef failed for " + parReader + "; retry");
448               parReader = null;
449               continue;
450             }
451             if (DEBUG) System.out.println(Thread.currentThread().getName()+ ": TEST: use existing already opened parReader=" + parReader + " refCount=" + parReader.getRefCount());
452             //checkParallelReader(leaf, parReader, schemaGen);
453           }
454 
455           // We return the new reference to caller
456           return parReader;
457         }
458       }
459     }
460 
461     // TODO: we could pass a writer already opened...?
462     protected abstract void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException;
463 
464     /** Returns the gen for the current schema. */
465     protected abstract long getCurrentSchemaGen();
466 
467     /** Returns the gen that should be merged, meaning those changes will be folded back into the main index. */
468     protected long getMergingSchemaGen() {
469       return getCurrentSchemaGen();
470     }
471 
472     /** Removes the parallel index that are no longer in the last commit point.  We can't
473      *  remove this when the parallel reader is closed because it may still be referenced by
474      *  the last commit. */
475     private void pruneOldSegments(boolean removeOldGens) throws IOException {
476       SegmentInfos lastCommit = SegmentInfos.readLatestCommit(indexDir);
477       if (DEBUG) System.out.println("TEST: prune");
478 
479       Set<String> liveIDs = new HashSet<String>();
480       for(SegmentCommitInfo info : lastCommit) {
481         String idString = StringHelper.idToString(info.info.getId());
482         liveIDs.add(idString);
483       }
484 
485       long currentSchemaGen = getCurrentSchemaGen();
486 
487       if (Files.exists(segsPath)) {
488         for (Path path : segSubDirs(segsPath)) {
489           if (Files.isDirectory(path)) {
490             SegmentIDAndGen segIDGen = new SegmentIDAndGen(path.getFileName().toString());
491             assert segIDGen.schemaGen <= currentSchemaGen;
492             if (liveIDs.contains(segIDGen.segID) == false && (closedSegments.contains(segIDGen) || (removeOldGens && segIDGen.schemaGen < currentSchemaGen))) {
493               if (DEBUG) System.out.println("TEST: remove " + segIDGen);
494               try {
495                 IOUtils.rm(path);
496                 closedSegments.remove(segIDGen);
497               } catch (IOException ioe) {
498                 // OK, we'll retry later
499                 if (DEBUG) System.out.println("TEST: ignore ioe during delete " + path + ":" + ioe);
500               }
501             }
502           }
503         }
504       }
505     }
506 
507     /** Just replaces the sub-readers with parallel readers, so reindexed fields are merged into new segments. */
508     private class ReindexingMergePolicy extends MergePolicy {
509 
510       class ReindexingOneMerge extends OneMerge {
511 
512         List<LeafReader> parallelReaders;
513         final long schemaGen;
514 
515         ReindexingOneMerge(List<SegmentCommitInfo> segments) {
516           super(segments);
517           // Commit up front to which schemaGen we will merge; we don't want a schema change sneaking in for some of our leaf readers but not others:
518           schemaGen = getMergingSchemaGen();
519           long currentSchemaGen = getCurrentSchemaGen();
520 
521           // Defensive sanity check:
522           if (schemaGen > currentSchemaGen) {
523             throw new IllegalStateException("currentSchemaGen (" + currentSchemaGen + ") must always be >= mergingSchemaGen (" + schemaGen + ")");
524           }
525         }
526 
527         @Override
528         public List<CodecReader> getMergeReaders() throws IOException {
529           if (parallelReaders == null) {
530             parallelReaders = new ArrayList<>();
531             for (CodecReader reader : super.getMergeReaders()) {
532               parallelReaders.add(getCurrentReader((SegmentReader)reader, schemaGen));
533             }
534           }
535 
536           // TODO: fix ParallelLeafReader, if this is a good use case
537           List<CodecReader> mergeReaders = new ArrayList<>();
538           for (LeafReader reader : parallelReaders) {
539             mergeReaders.add(SlowCodecReaderWrapper.wrap(reader));
540           }
541           return mergeReaders;
542         }
543 
544         @Override
545         public void mergeFinished() throws IOException {
546           Throwable th = null;
547           for(LeafReader r : parallelReaders) {
548             if (r instanceof ParallelLeafReader) {
549               try {
550                 r.decRef();
551               } catch (Throwable t) {
552                 if (th == null) {
553                   th = t;
554                 }
555               }
556             }
557           }
558 
559           // If any error occured, throw it.
560           IOUtils.reThrow(th);
561         }
562     
563         @Override
564         public void setMergeInfo(SegmentCommitInfo info) {
565           // Record that this merged segment is current as of this schemaGen:
566           info.info.getDiagnostics().put(SCHEMA_GEN_KEY, Long.toString(schemaGen));
567           super.setMergeInfo(info);
568         }
569 
570         @Override
571         public MergePolicy.DocMap getDocMap(final MergeState mergeState) {
572           return super.getDocMap(mergeState);
573         }
574       }
575 
576       class ReindexingMergeSpecification extends MergeSpecification {
577         @Override
578         public void add(OneMerge merge) {
579           super.add(new ReindexingOneMerge(merge.segments));
580         }
581 
582         @Override
583         public String segString(Directory dir) {
584           return "ReindexingMergeSpec(" + super.segString(dir) + ")";
585         }
586       }
587 
588       MergeSpecification wrap(MergeSpecification spec) {
589         MergeSpecification wrapped = null;
590         if (spec != null) {
591           wrapped = new ReindexingMergeSpecification();
592           for (OneMerge merge : spec.merges) {
593             wrapped.add(merge);
594           }
595         }
596         return wrapped;
597       }
598 
599       final MergePolicy in;
600 
601       /** Create a new {@code MergePolicy} that sorts documents with the given {@code sort}. */
602       public ReindexingMergePolicy(MergePolicy in) {
603         this.in = in;
604       }
605 
606       @Override
607       public MergeSpecification findMerges(MergeTrigger mergeTrigger,
608                                            SegmentInfos segmentInfos, IndexWriter writer) throws IOException {
609         return wrap(in.findMerges(mergeTrigger, segmentInfos, writer));
610       }
611 
612       @Override
613       public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
614                                                  int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer)
615         throws IOException {
616         // TODO: do we need to force-force this?  Ie, wrapped MP may think index is already optimized, yet maybe its schemaGen is old?  need test!
617         return wrap(in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer));
618       }
619 
620       @Override
621       public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, IndexWriter writer)
622         throws IOException {
623         return wrap(in.findForcedDeletesMerges(segmentInfos, writer));
624       }
625 
626       @Override
627       public boolean useCompoundFile(SegmentInfos segments,
628                                      SegmentCommitInfo newSegment, IndexWriter writer) throws IOException {
629         return in.useCompoundFile(segments, newSegment, writer);
630       }
631 
632       @Override
633       public String toString() {
634         return "ReindexingMergePolicy(" + in + ")";
635       }
636     }
637 
638     static long getSchemaGen(SegmentInfo info) {
639       String s = info.getDiagnostics().get(SCHEMA_GEN_KEY);
640       if (s == null) {
641         return -1;
642       } else {
643         return Long.parseLong(s);
644       }
645     }
646   }
647 
648   private ReindexingReader getReindexer(Path root) throws IOException {
649     return new ReindexingReader(root) {
650       @Override
651       protected IndexWriterConfig getIndexWriterConfig() throws IOException {
652         IndexWriterConfig iwc = newIndexWriterConfig();
653         TieredMergePolicy tmp = new TieredMergePolicy();
654         // We write tiny docs, so we need tiny floor to avoid O(N^2) merging:
655         tmp.setFloorSegmentMB(.01);
656         iwc.setMergePolicy(tmp);
657         return iwc;
658       }
659 
660       @Override
661       protected Directory openDirectory(Path path) throws IOException {
662         MockDirectoryWrapper dir = newMockFSDirectory(path);
663         dir.setUseSlowOpenClosers(false);
664         dir.setThrottling(Throttling.NEVER);
665         return dir;
666       }
667 
668       @Override
669       protected void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException {
670         IndexWriterConfig iwc = newIndexWriterConfig();
671 
672         // The order of our docIDs must precisely matching incoming reader:
673         iwc.setMergePolicy(new LogByteSizeMergePolicy());
674         IndexWriter w = new IndexWriter(parallelDir, iwc);
675         int maxDoc = reader.maxDoc();
676 
677         // Slowly parse the stored field into a new doc values field:
678         for(int i=0;i<maxDoc;i++) {
679           // TODO: is this still O(blockSize^2)?
680           Document oldDoc = reader.document(i);
681           Document newDoc = new Document();
682           long value = Long.parseLong(oldDoc.get("text").split(" ")[1]);
683           newDoc.add(new NumericDocValuesField("number", value));
684           newDoc.add(new LongField("number", value, Field.Store.NO));
685           w.addDocument(newDoc);
686         }
687 
688         if (random().nextBoolean()) {
689           w.forceMerge(1);
690         }
691 
692         w.close();
693       }
694 
695       @Override
696       protected long getCurrentSchemaGen() {
697         return 0;
698       }
699     };
700   }
701 
702   /** Schema change by adding a new number_<schemaGen> DV field each time. */
703   private ReindexingReader getReindexerNewDVFields(Path root, final AtomicLong currentSchemaGen) throws IOException {
704     return new ReindexingReader(root) {
705       @Override
706       protected IndexWriterConfig getIndexWriterConfig() throws IOException {
707         IndexWriterConfig iwc = newIndexWriterConfig();
708         TieredMergePolicy tmp = new TieredMergePolicy();
709         // We write tiny docs, so we need tiny floor to avoid O(N^2) merging:
710         tmp.setFloorSegmentMB(.01);
711         iwc.setMergePolicy(tmp);
712         return iwc;
713       }
714 
715       @Override
716       protected Directory openDirectory(Path path) throws IOException {
717         MockDirectoryWrapper dir = newMockFSDirectory(path);
718         dir.setUseSlowOpenClosers(false);
719         dir.setThrottling(Throttling.NEVER);
720         return dir;
721       }
722 
723       @Override
724       protected void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException {
725         IndexWriterConfig iwc = newIndexWriterConfig();
726 
727         // The order of our docIDs must precisely matching incoming reader:
728         iwc.setMergePolicy(new LogByteSizeMergePolicy());
729         IndexWriter w = new IndexWriter(parallelDir, iwc);
730         int maxDoc = reader.maxDoc();
731 
732         if (oldSchemaGen <= 0) {
733           // Must slowly parse the stored field into a new doc values field:
734           for(int i=0;i<maxDoc;i++) {
735             // TODO: is this still O(blockSize^2)?
736             Document oldDoc = reader.document(i);
737             Document newDoc = new Document();
738             long value = Long.parseLong(oldDoc.get("text").split(" ")[1]);
739             newDoc.add(new NumericDocValuesField("number_" + newSchemaGen, value));
740             newDoc.add(new LongField("number", value, Field.Store.NO));
741             w.addDocument(newDoc);
742           }
743         } else {
744           // Just carry over doc values from previous field:
745           NumericDocValues oldValues = reader.getNumericDocValues("number_" + oldSchemaGen);
746           assertNotNull("oldSchemaGen=" + oldSchemaGen, oldValues);
747           for(int i=0;i<maxDoc;i++) {
748             // TODO: is this still O(blockSize^2)?
749             Document oldDoc = reader.document(i);
750             Document newDoc = new Document();
751             newDoc.add(new NumericDocValuesField("number_" + newSchemaGen, oldValues.get(i)));
752             w.addDocument(newDoc);
753           }
754         }
755 
756         if (random().nextBoolean()) {
757           w.forceMerge(1);
758         }
759 
760         w.close();
761       }
762 
763       @Override
764       protected long getCurrentSchemaGen() {
765         return currentSchemaGen.get();
766       }
767 
768       @Override
769       protected void checkParallelReader(LeafReader r, LeafReader parR, long schemaGen) throws IOException {
770         String fieldName = "number_" + schemaGen;
771         if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now check parallel number DVs field=" + fieldName + " r=" + r + " parR=" + parR);
772         NumericDocValues numbers = parR.getNumericDocValues(fieldName);
773         if (numbers == null) {
774           return;
775         }
776         int maxDoc = r.maxDoc();
777         boolean failed = false;
778         for(int i=0;i<maxDoc;i++) {
779           Document oldDoc = r.document(i);
780           long value = Long.parseLong(oldDoc.get("text").split(" ")[1]);
781           if (value != numbers.get(i)) {
782             if (DEBUG) System.out.println("FAIL: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i) + " numbers=" + numbers);
783             failed = true;
784           } else if (failed) {
785             if (DEBUG) System.out.println("OK: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i));
786           }
787         }
788         assertFalse("FAILED field=" + fieldName + " r=" + r, failed);
789       }
790     };
791   }
792 
793   /** Schema change by adding changing how the same "number" DV field is indexed. */
794   private ReindexingReader getReindexerSameDVField(Path root, final AtomicLong currentSchemaGen, final AtomicLong mergingSchemaGen) throws IOException {
795     return new ReindexingReader(root) {
796       @Override
797       protected IndexWriterConfig getIndexWriterConfig() throws IOException {
798         IndexWriterConfig iwc = newIndexWriterConfig();
799         TieredMergePolicy tmp = new TieredMergePolicy();
800         // We write tiny docs, so we need tiny floor to avoid O(N^2) merging:
801         tmp.setFloorSegmentMB(.01);
802         iwc.setMergePolicy(tmp);
803         if (TEST_NIGHTLY) {
804           // during nightly tests, we might use too many files if we arent careful
805           iwc.setUseCompoundFile(true);
806         }
807         return iwc;
808       }
809 
810       @Override
811       protected Directory openDirectory(Path path) throws IOException {
812         MockDirectoryWrapper dir = newMockFSDirectory(path);
813         dir.setUseSlowOpenClosers(false);
814         dir.setThrottling(Throttling.NEVER);
815         return dir;
816       }
817 
818       @Override
819       protected void reindex(long oldSchemaGen, long newSchemaGen, LeafReader reader, Directory parallelDir) throws IOException {
820         IndexWriterConfig iwc = newIndexWriterConfig();
821 
822         // The order of our docIDs must precisely matching incoming reader:
823         iwc.setMergePolicy(new LogByteSizeMergePolicy());
824         IndexWriter w = new IndexWriter(parallelDir, iwc);
825         int maxDoc = reader.maxDoc();
826 
827         if (oldSchemaGen <= 0) {
828           // Must slowly parse the stored field into a new doc values field:
829           for(int i=0;i<maxDoc;i++) {
830             // TODO: is this still O(blockSize^2)?
831             Document oldDoc = reader.document(i);
832             Document newDoc = new Document();
833             long value = Long.parseLong(oldDoc.get("text").split(" ")[1]);
834             newDoc.add(new NumericDocValuesField("number", newSchemaGen*value));
835             newDoc.add(new LongField("number", value, Field.Store.NO));
836             w.addDocument(newDoc);
837           }
838         } else {
839           // Just carry over doc values from previous field:
840           NumericDocValues oldValues = reader.getNumericDocValues("number");
841           assertNotNull("oldSchemaGen=" + oldSchemaGen, oldValues);
842           for(int i=0;i<maxDoc;i++) {
843             // TODO: is this still O(blockSize^2)?
844             Document oldDoc = reader.document(i);
845             Document newDoc = new Document();
846             newDoc.add(new NumericDocValuesField("number", newSchemaGen*(oldValues.get(i)/oldSchemaGen)));
847             w.addDocument(newDoc);
848           }
849         }
850 
851         if (random().nextBoolean()) {
852           w.forceMerge(1);
853         }
854 
855         w.close();
856       }
857 
858       @Override
859       protected long getCurrentSchemaGen() {
860         return currentSchemaGen.get();
861       }
862 
863       @Override
864       protected long getMergingSchemaGen() {
865         return mergingSchemaGen.get();
866       }
867 
868       @Override
869       protected void checkParallelReader(LeafReader r, LeafReader parR, long schemaGen) throws IOException {
870         if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: now check parallel number DVs r=" + r + " parR=" + parR);
871         NumericDocValues numbers = parR.getNumericDocValues("numbers");
872         if (numbers == null) {
873           return;
874         }
875         int maxDoc = r.maxDoc();
876         boolean failed = false;
877         for(int i=0;i<maxDoc;i++) {
878           Document oldDoc = r.document(i);
879           long value = Long.parseLong(oldDoc.get("text").split(" ")[1]);
880           value *= schemaGen;
881           if (value != numbers.get(i)) {
882             System.out.println("FAIL: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i) + " numbers=" + numbers);
883             failed = true;
884           } else if (failed) {
885             System.out.println("OK: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i));
886           }
887         }
888         assertFalse("FAILED r=" + r, failed);
889       }
890     };
891   }
892 
893   public void testBasicMultipleSchemaGens() throws Exception {
894 
895     AtomicLong currentSchemaGen = new AtomicLong();
896 
897     // TODO: separate refresh thread, search threads, indexing threads
898     ReindexingReader reindexer = getReindexerNewDVFields(createTempDir(), currentSchemaGen);
899     reindexer.commit();
900 
901     Document doc = new Document();
902     doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
903     reindexer.w.addDocument(doc);
904 
905     if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: refresh @ 1 doc");
906     reindexer.mgr.maybeRefresh();
907     DirectoryReader r = reindexer.mgr.acquire();
908     if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: got reader=" + r);
909     try {
910       checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1);
911     } finally {
912       reindexer.mgr.release(r);
913     }
914     //reindexer.printRefCounts();
915 
916     currentSchemaGen.incrementAndGet();
917 
918     if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: increment schemaGen");
919     if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST: commit");
920     reindexer.commit();
921 
922     doc = new Document();
923     doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
924     reindexer.w.addDocument(doc);
925 
926     if (DEBUG) System.out.println("TEST: refresh @ 2 docs");
927     reindexer.mgr.maybeRefresh();
928     //reindexer.printRefCounts();
929     r = reindexer.mgr.acquire();
930     if (DEBUG) System.out.println("TEST: got reader=" + r);
931     try {
932       checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1);
933     } finally {
934       reindexer.mgr.release(r);
935     }
936 
937     if (DEBUG) System.out.println("TEST: forceMerge");
938     reindexer.w.forceMerge(1);
939 
940     currentSchemaGen.incrementAndGet();
941 
942     if (DEBUG) System.out.println("TEST: commit");
943     reindexer.commit();
944 
945     if (DEBUG) System.out.println("TEST: refresh after forceMerge");
946     reindexer.mgr.maybeRefresh();
947     r = reindexer.mgr.acquire();
948     if (DEBUG) System.out.println("TEST: got reader=" + r);
949     try {
950       checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1);
951     } finally {
952       reindexer.mgr.release(r);
953     }
954 
955     if (DEBUG) System.out.println("TEST: close writer");
956     reindexer.close();
957   }
958 
959   public void testRandomMultipleSchemaGens() throws Exception {
960 
961     AtomicLong currentSchemaGen = new AtomicLong();
962     ReindexingReader reindexer = null;
963 
964     // TODO: separate refresh thread, search threads, indexing threads
965     int numDocs = atLeast(TEST_NIGHTLY ? 20000 : 1000);
966     int maxID = 0;
967     Path root = createTempDir();
968     int refreshEveryNumDocs = 100;
969     int commitCloseNumDocs = 1000;
970     for(int i=0;i<numDocs;i++) {
971       if (reindexer == null) {
972         reindexer = getReindexerNewDVFields(root, currentSchemaGen);
973       }
974 
975       Document doc = new Document();
976       String id;
977       String updateID;
978       if (maxID > 0 && random().nextInt(10) == 7) {
979         // Replace a doc
980         id = "" + random().nextInt(maxID);
981         updateID = id;
982       } else {
983         id = "" + (maxID++);
984         updateID = null;
985       }
986         
987       doc.add(newStringField("id", id, Field.Store.NO));
988       doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
989       if (updateID == null) {
990         reindexer.w.addDocument(doc);
991       } else {
992         reindexer.w.updateDocument(new Term("id", updateID), doc);
993       }
994       if (random().nextInt(refreshEveryNumDocs) == 17) {
995         if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: refresh @ " + (i+1) + " docs");
996         reindexer.mgr.maybeRefresh();
997 
998         DirectoryReader r = reindexer.mgr.acquire();
999         if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: got reader=" + r);
1000         try {
1001           checkAllNumberDVs(r, "number_" + currentSchemaGen.get(), true, 1);
1002         } finally {
1003           reindexer.mgr.release(r);
1004         }
1005         if (DEBUG) reindexer.printRefCounts();
1006         refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs);
1007       }
1008 
1009       if (random().nextInt(500) == 17) {
1010         currentSchemaGen.incrementAndGet();
1011         if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: advance schemaGen to " + currentSchemaGen);
1012       }
1013 
1014       if (i > 0 && random().nextInt(10) == 7) {
1015         // Random delete:
1016         reindexer.w.deleteDocuments(new Term("id", ""+random().nextInt(i)));
1017       }
1018 
1019       if (random().nextInt(commitCloseNumDocs) == 17) {
1020         if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: commit @ " + (i+1) + " docs");
1021         reindexer.commit();
1022         //reindexer.printRefCounts();
1023         commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs);
1024       }
1025 
1026       // Sometimes close & reopen writer/manager, to confirm the parallel segments persist:
1027       if (random().nextInt(commitCloseNumDocs) == 17) {
1028         if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: close writer @ " + (i+1) + " docs");
1029         reindexer.close();
1030         reindexer = null;
1031         commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs);
1032       }
1033     }
1034 
1035     if (reindexer != null) {
1036       reindexer.close();
1037     }
1038   }
1039 
1040   /** First schema change creates a new "number" DV field off the stored field; subsequent changes just change the value of that number
1041    *  field for all docs. */
1042   public void testRandomMultipleSchemaGensSameField() throws Exception {
1043 
1044     AtomicLong currentSchemaGen = new AtomicLong();
1045     AtomicLong mergingSchemaGen = new AtomicLong();
1046 
1047     ReindexingReader reindexer = null;
1048 
1049     // TODO: separate refresh thread, search threads, indexing threads
1050     int numDocs = atLeast(TEST_NIGHTLY ? 20000 : 1000);
1051     int maxID = 0;
1052     Path root = createTempDir();
1053     int refreshEveryNumDocs = 100;
1054     int commitCloseNumDocs = 1000;
1055 
1056     for(int i=0;i<numDocs;i++) {
1057       if (reindexer == null) {
1058         if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: open new reader/writer");
1059         reindexer = getReindexerSameDVField(root, currentSchemaGen, mergingSchemaGen);
1060       }
1061 
1062       Document doc = new Document();
1063       String id;
1064       String updateID;
1065       if (maxID > 0 && random().nextInt(10) == 7) {
1066         // Replace a doc
1067         id = "" + random().nextInt(maxID);
1068         updateID = id;
1069       } else {
1070         id = "" + (maxID++);
1071         updateID = null;
1072       }
1073         
1074       doc.add(newStringField("id", id, Field.Store.NO));
1075       doc.add(newTextField("text", "number " + TestUtil.nextInt(random(), -10000, 10000), Field.Store.YES));
1076       if (updateID == null) {
1077         reindexer.w.addDocument(doc);
1078       } else {
1079         reindexer.w.updateDocument(new Term("id", updateID), doc);
1080       }
1081       if (random().nextInt(refreshEveryNumDocs) == 17) {
1082         if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: refresh @ " + (i+1) + " docs");
1083         reindexer.mgr.maybeRefresh();
1084         DirectoryReader r = reindexer.mgr.acquire();
1085         if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: got reader=" + r);
1086         try {
1087           checkAllNumberDVs(r, "number", true, (int) currentSchemaGen.get());
1088         } finally {
1089           reindexer.mgr.release(r);
1090         }
1091         if (DEBUG) reindexer.printRefCounts();
1092         refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs);
1093       }
1094 
1095       if (random().nextInt(500) == 17) {
1096         currentSchemaGen.incrementAndGet();
1097         if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: advance schemaGen to " + currentSchemaGen);
1098         if (random().nextBoolean()) {
1099           mergingSchemaGen.incrementAndGet();
1100           if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: advance mergingSchemaGen to " + mergingSchemaGen);
1101         }
1102       }
1103 
1104       if (i > 0 && random().nextInt(10) == 7) {
1105         // Random delete:
1106         reindexer.w.deleteDocuments(new Term("id", ""+random().nextInt(i)));
1107       }
1108 
1109       if (random().nextInt(commitCloseNumDocs) == 17) {
1110         if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: commit @ " + (i+1) + " docs");
1111         reindexer.commit();
1112         //reindexer.printRefCounts();
1113         commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs);
1114       }
1115 
1116       // Sometimes close & reopen writer/manager, to confirm the parallel segments persist:
1117       if (random().nextInt(commitCloseNumDocs) == 17) {
1118         if (DEBUG) System.out.println(Thread.currentThread().getName() + ": TEST TOP: close writer @ " + (i+1) + " docs");
1119         reindexer.close();
1120         reindexer = null;
1121         commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs);
1122       }
1123     }
1124 
1125     if (reindexer != null) {
1126       reindexer.close();
1127     }
1128 
1129     // Verify main index never reflects schema changes beyond mergingSchemaGen:
1130     try (Directory dir = newFSDirectory(root.resolve("index"));
1131          IndexReader r = DirectoryReader.open(dir)) {
1132         for (LeafReaderContext ctx : r.leaves()) {
1133           LeafReader leaf = ctx.reader();
1134           NumericDocValues numbers = leaf.getNumericDocValues("number");
1135           if (numbers != null) {
1136             int maxDoc = leaf.maxDoc();
1137             for(int i=0;i<maxDoc;i++) {
1138               Document doc = leaf.document(i);
1139               long value = Long.parseLong(doc.get("text").split(" ")[1]);
1140               long dvValue = numbers.get(i);
1141               if (value == 0) {
1142                 assertEquals(0, dvValue);
1143               } else {
1144                 assertTrue(dvValue % value == 0);
1145                 assertTrue(dvValue / value <= mergingSchemaGen.get());
1146               }
1147             }
1148           }
1149         }
1150       }
1151   }
1152 
1153   public void testBasic() throws Exception {
1154     ReindexingReader reindexer = getReindexer(createTempDir());
1155 
1156     // Start with initial empty commit:
1157     reindexer.commit();
1158 
1159     Document doc = new Document();
1160     doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
1161     reindexer.w.addDocument(doc);
1162 
1163     if (DEBUG) System.out.println("TEST: refresh @ 1 doc");
1164     reindexer.mgr.maybeRefresh();
1165     DirectoryReader r = reindexer.mgr.acquire();
1166     if (DEBUG) System.out.println("TEST: got reader=" + r);
1167     try {
1168       checkAllNumberDVs(r);
1169       IndexSearcher s = newSearcher(r);
1170       testNumericDVSort(s);
1171       testNumericRangeQuery(s);
1172     } finally {
1173       reindexer.mgr.release(r);
1174     }
1175     //reindexer.printRefCounts();
1176 
1177     if (DEBUG) System.out.println("TEST: commit");
1178     reindexer.commit();
1179 
1180     doc = new Document();
1181     doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
1182     reindexer.w.addDocument(doc);
1183 
1184     if (DEBUG) System.out.println("TEST: refresh @ 2 docs");
1185     reindexer.mgr.maybeRefresh();
1186     //reindexer.printRefCounts();
1187     r = reindexer.mgr.acquire();
1188     if (DEBUG) System.out.println("TEST: got reader=" + r);
1189     try {
1190       checkAllNumberDVs(r);
1191       IndexSearcher s = newSearcher(r);
1192       testNumericDVSort(s);
1193       testNumericRangeQuery(s);
1194     } finally {
1195       reindexer.mgr.release(r);
1196     }
1197 
1198     if (DEBUG) System.out.println("TEST: forceMerge");
1199     reindexer.w.forceMerge(1);
1200 
1201     if (DEBUG) System.out.println("TEST: commit");
1202     reindexer.commit();
1203 
1204     if (DEBUG) System.out.println("TEST: refresh after forceMerge");
1205     reindexer.mgr.maybeRefresh();
1206     r = reindexer.mgr.acquire();
1207     if (DEBUG) System.out.println("TEST: got reader=" + r);
1208     try {
1209       checkAllNumberDVs(r);
1210       IndexSearcher s = newSearcher(r);
1211       testNumericDVSort(s);
1212       testNumericRangeQuery(s);
1213     } finally {
1214       reindexer.mgr.release(r);
1215     }
1216 
1217     if (DEBUG) System.out.println("TEST: close writer");
1218     reindexer.close();
1219   }
1220 
1221   public void testRandom() throws Exception {
1222     Path root = createTempDir();
1223     ReindexingReader reindexer = null;
1224 
1225     // TODO: separate refresh thread, search threads, indexing threads
1226     int numDocs = atLeast(TEST_NIGHTLY ? 20000 : 1000);
1227     int maxID = 0;
1228     int refreshEveryNumDocs = 100;
1229     int commitCloseNumDocs = 1000;
1230     for(int i=0;i<numDocs;i++) {
1231       if (reindexer == null) {
1232         reindexer = getReindexer(root);
1233       }
1234 
1235       Document doc = new Document();
1236       String id;
1237       String updateID;
1238       if (maxID > 0 && random().nextInt(10) == 7) {
1239         // Replace a doc
1240         id = "" + random().nextInt(maxID);
1241         updateID = id;
1242       } else {
1243         id = "" + (maxID++);
1244         updateID = null;
1245       }
1246         
1247       doc.add(newStringField("id", id, Field.Store.NO));
1248       doc.add(newTextField("text", "number " + random().nextLong(), Field.Store.YES));
1249       if (updateID == null) {
1250         reindexer.w.addDocument(doc);
1251       } else {
1252         reindexer.w.updateDocument(new Term("id", updateID), doc);
1253       }
1254 
1255       if (random().nextInt(refreshEveryNumDocs) == 17) {
1256         if (DEBUG) System.out.println("TEST: refresh @ " + (i+1) + " docs");
1257         reindexer.mgr.maybeRefresh();
1258         DirectoryReader r = reindexer.mgr.acquire();
1259         if (DEBUG) System.out.println("TEST: got reader=" + r);
1260         try {
1261           checkAllNumberDVs(r);
1262           IndexSearcher s = newSearcher(r);
1263           testNumericDVSort(s);
1264           testNumericRangeQuery(s);
1265         } finally {
1266           reindexer.mgr.release(r);
1267         }
1268         refreshEveryNumDocs = (int) (1.25 * refreshEveryNumDocs);
1269       }
1270 
1271       if (i > 0 && random().nextInt(10) == 7) {
1272         // Random delete:
1273         reindexer.w.deleteDocuments(new Term("id", ""+random().nextInt(i)));
1274       }
1275 
1276       if (random().nextInt(commitCloseNumDocs) == 17) {
1277         if (DEBUG) System.out.println("TEST: commit @ " + (i+1) + " docs");
1278         reindexer.commit();
1279         commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs);
1280       }
1281 
1282       // Sometimes close & reopen writer/manager, to confirm the parallel segments persist:
1283       if (random().nextInt(commitCloseNumDocs) == 17) {
1284         if (DEBUG) System.out.println("TEST: close writer @ " + (i+1) + " docs");
1285         reindexer.close();
1286         reindexer = null;
1287         commitCloseNumDocs = (int) (1.25 * commitCloseNumDocs);
1288       }
1289     }
1290     if (reindexer != null) {
1291       reindexer.close();
1292     }
1293   }
1294 
1295   private static void checkAllNumberDVs(IndexReader r) throws IOException {
1296     checkAllNumberDVs(r, "number", true, 1);
1297   }
1298 
1299   private static void checkAllNumberDVs(IndexReader r, String fieldName, boolean doThrow, int multiplier) throws IOException {
1300     NumericDocValues numbers = MultiDocValues.getNumericValues(r, fieldName);
1301     int maxDoc = r.maxDoc();
1302     boolean failed = false;
1303     long t0 = System.currentTimeMillis();
1304     for(int i=0;i<maxDoc;i++) {
1305       Document oldDoc = r.document(i);
1306       long value = multiplier * Long.parseLong(oldDoc.get("text").split(" ")[1]);
1307       if (value != numbers.get(i)) {
1308         System.out.println("FAIL: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i) + " numbers=" + numbers);
1309         failed = true;
1310       } else if (failed) {
1311         System.out.println("OK: docID=" + i + " " + oldDoc+ " value=" + value + " number=" + numbers.get(i));
1312       }
1313     }
1314     if (failed) {
1315       if (r instanceof LeafReader == false) {
1316         System.out.println("TEST FAILED; check leaves");
1317         for(LeafReaderContext ctx : r.leaves()) {
1318           System.out.println("CHECK LEAF=" + ctx.reader());
1319           checkAllNumberDVs(ctx.reader(), fieldName, false, 1);
1320         }
1321       }
1322       if (doThrow) {
1323         assertFalse("FAILED field=" + fieldName + " r=" + r, failed);
1324       } else {
1325         System.out.println("FAILED field=" + fieldName + " r=" + r);
1326       }
1327     }
1328   }
1329 
1330   private static void testNumericDVSort(IndexSearcher s) throws IOException {
1331     // Confirm we can sort by the new DV field:
1332     TopDocs hits = s.search(new MatchAllDocsQuery(), 100, new Sort(new SortField("number", SortField.Type.LONG)));
1333     NumericDocValues numbers = MultiDocValues.getNumericValues(s.getIndexReader(), "number");
1334     long last = Long.MIN_VALUE;
1335     for(ScoreDoc scoreDoc : hits.scoreDocs) {
1336       long value = Long.parseLong(s.doc(scoreDoc.doc).get("text").split(" ")[1]);
1337       assertTrue(value >= last);
1338       assertEquals(value, numbers.get(scoreDoc.doc));
1339       last = value;
1340     }
1341   }
1342 
1343   private static void testNumericRangeQuery(IndexSearcher s) throws IOException {
1344     NumericDocValues numbers = MultiDocValues.getNumericValues(s.getIndexReader(), "number");
1345     for(int i=0;i<100;i++) {
1346       // Confirm we can range search by the new indexed (numeric) field:
1347       long min = random().nextLong();
1348       long max = random().nextLong();
1349       if (min > max) {
1350         long x = min;
1351         min = max;
1352         max = x;
1353       }
1354 
1355       TopDocs hits = s.search(NumericRangeQuery.newLongRange("number", min, max, true, true), 100);
1356       for(ScoreDoc scoreDoc : hits.scoreDocs) {
1357         long value = Long.parseLong(s.doc(scoreDoc.doc).get("text").split(" ")[1]);
1358         assertTrue(value >= min);
1359         assertTrue(value <= max);
1360         assertEquals(value, numbers.get(scoreDoc.doc));
1361       }
1362     }
1363   }
1364 
1365   // TODO: maybe the leading id could be further restricted?  It's from StringHelper.idToString:
1366   static final Pattern SEG_GEN_SUB_DIR_PATTERN = Pattern.compile("^[a-z0-9]+_([0-9]+)$");
1367 
1368   private static List<Path> segSubDirs(Path segsPath) throws IOException {
1369     List<Path> result = new ArrayList<>();
1370     try (DirectoryStream<Path> stream = Files.newDirectoryStream(segsPath)) {
1371       for (Path path : stream) {
1372         // Must be form <segIDString>_<longGen>
1373         if (Files.isDirectory(path) && SEG_GEN_SUB_DIR_PATTERN.matcher(path.getFileName().toString()).matches()) {
1374           result.add(path);
1375         }
1376       }
1377     }
1378 
1379     return result;
1380   }
1381 
1382   // TODO: test exceptions
1383 }